{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Feature Transformation with Amazon a SageMaker Processing Job and Scikit-Learn\n",
    "\n",
    "Typically a machine learning (ML) process consists of few steps. First, gathering data with various ETL jobs, then pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge, and finally training an ML model using an algorithm.\n",
    "\n",
    "Often, distributed data processing frameworks such as Scikit-Learn are used to pre-process data sets in order to prepare them for training. In this notebook we'll use Amazon SageMaker Processing, and leverage the power of Scikit-Learn in a managed SageMaker environment to run our processing workload."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "![](img/prepare_dataset.png)\n",
    "\n",
    "![](img/processing.jpg)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Contents\n",
    "\n",
    "1. Setup Environment\n",
    "1. Setup Input Data\n",
    "1. Setup Output Data\n",
    "1. Build a Spark container for running the processing job\n",
    "1. Run the Processing Job using Amazon SageMaker\n",
    "1. Inspect the Processed Output Data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setup Environment\n",
    "\n",
    "Let's start by specifying:\n",
    "* The S3 bucket and prefixes that you use for training and model data. Use the default bucket specified by the Amazon SageMaker session.\n",
    "* The IAM role ARN used to give processing and training access to the dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install boto3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sagemaker\n",
    "from time import gmtime, strftime\n",
    "import boto3\n",
    "\n",
    "sagemaker_session = sagemaker.Session()\n",
    "role = sagemaker.get_execution_role()\n",
    "bucket = sagemaker_session.default_bucket()\n",
    "region = boto3.Session().region_name"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setup Input Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Inputs\n",
    "s3_input_data = 's3://{}/amazon-reviews-pds/tsv/'.format(bucket)\n",
    "print(s3_input_data)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "!aws s3 ls $s3_input_data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setup Output Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "timestamp_prefix = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n",
    "\n",
    "output_prefix = 'amazon-reviews-scikit-processor-{}'.format(timestamp_prefix)\n",
    "scikit_processing_job_name = 'amazon-reviews-scikit-processor-{}'.format(timestamp_prefix)\n",
    "\n",
    "print('Processing job name:  {}'.format(scikit_processing_job_name))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Run the Processing Job using Amazon SageMaker\n",
    "\n",
    "Next, use the Amazon SageMaker Python SDK to submit a processing job. Use the Spark container that was just built, and a SparkML script for processing in the job configuration."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Review the Spark processing script."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cat preprocess-scikit-label-split.py"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Run this script as a processing job.  You also need to specify one `ProcessingInput` with the `source` argument of the Amazon S3 bucket and `destination` is where the script reads this data from `/opt/ml/processing/input` (inside the Docker container.)  All local paths inside the processing container must begin with `/opt/ml/processing/`.\n",
    "\n",
    "Also give the `run()` method a `ProcessingOutput`, where the `source` is the path the script writes output data to.  For outputs, the `destination` defaults to an S3 bucket that the Amazon SageMaker Python SDK creates for you, following the format `s3://sagemaker-<region>-<account_id>/<processing_job_name>/output/<output_name>/`.  You also give the `ProcessingOutput` value for `output_name`, to make it easier to retrieve these output artifacts after the job is run.\n",
    "\n",
    "The arguments parameter in the `run()` method are command-line arguments in our `preprocess-*.py` script.\n",
    "\n",
    "Note that we sharding the data using `ShardedS3Key` to spread the transformations across all worker nodes in the cluster."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sagemaker.sklearn.processing import SKLearnProcessor\n",
    "from sagemaker.processing import ProcessingInput, ProcessingOutput\n",
    "\n",
    "processor = SKLearnProcessor(framework_version='0.20.0',\n",
    "                             role=role,\n",
    "                             instance_type='ml.m5.4xlarge',\n",
    "                             instance_count=2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "processor.run(code='preprocess-scikit-label-split.py',\n",
    "                      inputs=[ProcessingInput(source=s3_input_data,\n",
    "                                              destination='/opt/ml/processing/input/data/',\n",
    "                                              s3_data_distribution_type='ShardedByS3Key')],\n",
    "                      outputs=[\n",
    "                               ProcessingOutput(s3_upload_mode='EndOfJob',\n",
    "                                                output_name='raw-labeled-split-balanced-header-train',\n",
    "                                                source='/opt/ml/processing/output/raw/labeled/split/balanced/header/train'),\n",
    "                               ProcessingOutput(s3_upload_mode='EndOfJob',\n",
    "                                                output_name='raw-labeled-split-balanced-header-validation',\n",
    "                                                source='/opt/ml/processing/output/raw/labeled/split/balanced/header/validation'),\n",
    "                               ProcessingOutput(s3_upload_mode='EndOfJob',\n",
    "                                                output_name='raw-labeled-split-balanced-header-test',\n",
    "                                                source='/opt/ml/processing/output/raw/labeled/split/balanced/header/test'),\n",
    "                      ],\n",
    "                      logs=True,\n",
    "                      wait=False)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "scikit_processing_job_name = processor.jobs[-1].describe()['ProcessingJobName']\n",
    "\n",
    "from IPython.core.display import display, HTML\n",
    "display(HTML('<b>Review <a href=\"https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=/aws/sagemaker/ProcessingJobs;prefix={};streamFilter=typeLogStreamPrefix\">CloudWatch Logs</a> After About 5 Minutes</b>'.format(region, scikit_processing_job_name)))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "# Our job writes to `processing_job_name` since we are using ProcessingOutput above\n",
    "scikit_processing_job_s3_output_prefix = scikit_processing_job_name\n",
    "\n",
    "display(HTML('<b>Review <a href=\"https://s3.console.aws.amazon.com/s3/buckets/{}/{}/?region={}&tab=overview\">S3 Output Data</a> After The Spark Job Has Completed</b>'.format(bucket, scikit_processing_job_s3_output_prefix, region)))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Please Wait Until the Processing Job Completes\n",
    "Re-run this next cell until the job status shows `Completed`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "running_processor = sagemaker.processing.ProcessingJob.from_processing_name(processing_job_name=scikit_processing_job_name,\n",
    "                                                                            sagemaker_session=sagemaker_session)\n",
    "\n",
    "processing_job_description = running_processor.describe()\n",
    "\n",
    "processing_job_status = processing_job_description['ProcessingJobStatus']\n",
    "print('\\n')\n",
    "print(processing_job_status)\n",
    "print('\\n')\n",
    "\n",
    "print(processing_job_description)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Inspect the Processed Output Data\n",
    "\n",
    "## The next cells will not work properly until the job completes above.\n",
    "\n",
    "Take a look at a few rows of the transformed dataset to make sure the processing was successful."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "output_config = processing_job_description['ProcessingOutputConfig']\n",
    "for output in output_config['Outputs']:\n",
    "    if output['OutputName'] == 'raw-labeled-split-balanced-header-train':\n",
    "        processed_balanced_train_data = output['S3Output']['S3Uri']\n",
    "    if output['OutputName'] == 'raw-labeled-split-balanced-header-validation':\n",
    "        processed_balanced_validation_data = output['S3Output']['S3Uri']        \n",
    "    if output['OutputName'] == 'raw-labeled-split-balanced-header-test':\n",
    "        processed_balanced_test_data = output['S3Output']['S3Uri']\n",
    "        \n",
    "print(processed_balanced_train_data)\n",
    "print(processed_balanced_validation_data)\n",
    "print(processed_balanced_test_data)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 ls $processed_balanced_train_data/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 ls $processed_balanced_validation_data/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 ls $processed_balanced_test_data/"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Pass `scikit_processing_job_s3_output_prefix` above as input to the next notebook"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(scikit_processing_job_s3_output_prefix)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store scikit_processing_job_s3_output_prefix\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
